Dataset Usage

The Dataset Object

Dataset is the primary class of the package. It is a general purpose “container” which stores your function, runs and their results/errors.

For the purposes of this tutorial, we will bring back our basic multiply function you may have seen in the quickstart guide. Though we will amend it such that the delay time is adjustable:

[2]:
def multiply(a, b, t=1):
    import time

    time.sleep(t)

    return a * b

We can now set up a Dataset. The only required argument is the function, though there are many other optional arguments, most of which we shall also cover in this tutorial. See the Dataset API documentation for full details.

Again for this tutorial we will be using a local url, this enables the functions to run anywhere and be tested.

Note

At a basic level , the URL is a connection to your machine, and can be swapped out at any time to change machines. In theory any function which runs on URL('machine.a') will also run just the same on URL('machine.b').

[3]:
import time
from remotemanager import Dataset, URL

url = URL('localhost')

ds = Dataset(function=multiply,
             url=url,
             script='#!/bin/bash',
             submitter='bash',
             local_dir='temp_ds_staging',
             remote_dir='temp_ds_remote',
             name='tutorial_dataset',
             skip=False)

The arguments shown here are likely to be the ones used the most. So in short:

  • url: The remote connection, if it is not given, a default localhost “connection” will be created for you.

  • local_dir: This is the directory that will be used to “stage” your files before sending to the remote. Defaults to temp_runner_local.

  • remote_dir: Remote directory where files will be sent to. Defaults to temp_runner_remote.

  • name: Datasets can be named, which makes their files easier to locate. By default, and files will simply use the uuid of the dataset/runner to differentiate.

  • skip: Contextual argument, if set to False, will disable the Dataset init “skip”, forcing it to delete the existing database and start anew.

Extra Variables

If you wish to run on a machine which has a scheduler system, you can use the script variable to pass your jobscript. Though there are more advanced features in place to generate dynamic jobscripts, see the Scheduler Tutorial for more info.

You can also specify a run_dir, which will be an internal directory within remote_dir. By default this is not specified and runs will run within the remote_dir.

dbfile allows you to force the dataset to store its database within a specific filename, should you wish to keep track of this. Otherwise, it defaults to {self.name}-{self.short_uuid}.yaml.

Appending Runs

Before running your function you must append runs containing any arguments.

Dataset.append_run() allows for run creation, and at minimum requires a dict containing the required arguments for your function.

So in our case, a dictionary containing arguments for a and b are necessary for a run to begin. As t has a default value of 1, it is optional. The structure below will append 3 runs displaying this behaviour:

Note

This is also true for runs that take no arguments, simply call append_run()

[4]:
runs = [{'a': 10, 'b': 5},
        {'a': 5.7, 'b': 8.4},
        {'a': 4, 'b': 4, 't': 6}]

for run in runs:
    ds.append_run(args=run)
appended run runner-0
appended run runner-1
appended run runner-2

Note

There is also the alias arguments for args

Additonally, if you wish to run scripts within unique folders, you can specify a run_dir when appending runs. If this attribute is present, this folder will be created within the remote dir and the function will be run from within. You may need to adjust your scripts and additional files to suit this run behaviour.

The Runner object

Now we have a dataset which is able to be run and return our results. Before we do this, it is worth stepping through some useful debugging tools.

Firstly, how to query what runs you already have. This can be done by accessing the property Dataset.runners:

[5]:
ds.runners
[5]:
[tutorial_dataset-a2c088ba-runner-0,
 tutorial_dataset-a2c088ba-runner-1,
 tutorial_dataset-a2c088ba-runner-2]

There is also the runner_dict property, which returns the same information in dict(append id: runner) format

[6]:
ds.runner_dict
[6]:
{'runner-0': tutorial_dataset-a2c088ba-runner-0,
 'runner-1': tutorial_dataset-a2c088ba-runner-1,
 'runner-2': tutorial_dataset-a2c088ba-runner-2}

Lazy Append

Added in version 0.8.4.

If you have a lot of runners to append (especially ones with large arguments), the base append_run can begin to slow down drastically. For such situations, you can call a context manager to wrap your run appends.

Here we copy the dataset (so as not to add too much bloat to the tutorial), then add 10 more runs:

[7]:
import copy
example_ds = copy.deepcopy(ds)

with example_ds.lazy_append() as la:
    for i in range(10):
        la.append_run({'a': i, 'b': 0})

print(len(example_ds.runners))

del example_ds
Of 13 appends: 13 appended
See get_append_log for more info
13

There is also a lazy option which can be used, which does the same thing. However there is a requirement that once you are done appending runs, you must add a finish_append() call, which finalises the appends all at once as though they were called normally.

Warning

Omitting the finish_append() after using a lazy append will not raise an error, but can cause strange behaviour.

Running the Dataset

Running of the datasets is done via the Dataset.run() method. This gives you one final opportunity to override any run arguments, as it provides another run_args catch for extra keyword args.

Note

Be aware of the argument expansion limitation that exists with rsync versions below 3.0.0. If you get errors during transfer, be sure to check rsync --version >= 3.

[8]:
ds.run()
Staging Dataset... Staged 3/3 Runners
Transferring for 3/3 Runners
Transferring 9 Files... Done
Remotely executing 3/3 Runners
[8]:
True

If you’re following along on your machine you may have noticed that this call completed instantly, yet our function has a time.sleep line in it. We would expect to have to wait 8s for this (1+1+6s delays).

This is because the dataset run defaults to be asynchronous, and as you can imagine, this can be updated by passing this as a run_arg wherever you wish.

Waiting for Completion

Calculations can take time. You have two (non exclusive) options for dealing with this:

  • Leave the notebook for a while and rerun when you think the jobs have finished

  • Use wait

Rerunning the notebook at any time will cause the inbuilt skip methods to kick in and make sure that any running or completed jobs are not resubmitted. This means that you can submit and leave the notebook. At rerun, and any fetch_results which failed before will grab the results this time.

Note

Rerunning the notebook works fine provided you have not specified skip=False of force=True anywhere.

You can also use the wait keyword. This is a one line wrapper for a block that looks similar to this:

interval = 2
timeout = 10

t0 = time.time()
while not ds.all_finished():
    time.sleep(interval)

    if time.time() - t0 > timeout:
        break

This periodically checks for completed runs every interval seconds. It is also a blocking call until ds.all_finished returns True, or more time than timeout has passed.

[9]:
ds.wait(interval=2, timeout=10)

The call here means to check every 2 seconds, and raise a timeout error after 10 total seconds have passed.

Note

By default, wait waits for any completion, including failures. You can restrict this to wait for a total success (timing out if there are failures) by passing success_only=True.

Asynchronous

Asynchronous behaviour also means that each runner is running simultaneously, this can put excess load on machines not designed for it, or simply may not be what you want for your workflow. To avoid this, we can use asynchronous=False

Additionally here, we must use the force=True keyword to ensure that the runs go through, as the previous runs are marked as complete. Be careful using this keyword in your workflows with long jobs, as if they are still running and complete before your more recent run, it wil cause the results to be “injected”.

[11]:
ds.reset_runs(wipe=True, confirm=False)

t0 = time.perf_counter()

ds.run(asynchronous=False)

dt = time.perf_counter() - t0

# we expect that the synchronous run will take around 1+1+6=8s
expected_time = 8
# the test suite can take extra time here, need to leave ~2s of room
assert abs(dt - expected_time) < 2, f"run completed in {dt}s"

print(f"run completed in {dt:.2f}s")
Staging Dataset... Staged 0/3 Runners
Transferring for 3/3 Runners
Transferring 9 Files... Done
Remotely executing 3/3 Runners
run completed in 8.12s

While not particularly useful in a wide range of use cases, there may be a situation case where you want to wait for a short run to complete, and this also displays the amending of run variables nicely.

One final way you are able to set the run args is via the set_run_arg method

[12]:
ds.set_run_arg('asynchronous', True)
print(ds.run_args["asynchronous"])

ds.set_run_arg('new_option', 'value!')
print(ds.run_args["new_option"])
True
value!

Collecting Results

There are functions indended to be used after a run has been called, to interact with the run, or the results.

We shall cover:

  • is_finished

  • all_finished

  • fetch_results

  • results

  • errors

Dataset.is_finished

This property will return a boolean list of the is_finished method of the runners. Runners are considered finished when they have either returned a result, or failed with an error.

Dataset.all_finished

This property returns the all() of Dataset.is_finished

To demonstrate these, we shall re-run and see what the state looks like at a few time intervals. But first, we must make sure that the results are not already present.

[13]:
print('wiping result files...')

# this function will clear any runner results and optionally wipe local files
ds.reset_runs(wipe=True, confirm=False)
wiping result files...

Lets add a run that will fail, to demonstrate how errors are handled

[14]:
# we can't multiply an int by None, so this should fail
ds.append_run({'a': 0, 'b': None})
appended run runner-3
[15]:
time.sleep(1)  # this short sleep prevents earlier runs getting in the way

print('calcs launched, waiting before checking completion')
ds.run(asynchronous=True)

time.sleep(2)

print('\nafter 2s, state is now:')
print(ds.is_finished)
print('all_finished:', ds.all_finished)

time.sleep(5)

print('\nafter 7s, state is now:')
print(ds.is_finished)
print('all_finished:', ds.all_finished)
calcs launched, waiting before checking completion
Staging Dataset... Staged 1/4 Runners
Transferring for 4/4 Runners
Transferring 11 Files... Done
Remotely executing 4/4 Runners

after 2s, state is now:
[True, True, False, True]
all_finished: False

after 7s, state is now:
[True, True, True, True]
all_finished: True

It may seem counter-intuitive that the runs are all completed at 7s, but if we recall that they were launched asynchronously by default, the whole run would take around 6s (our maximum delay time).

The remaining functions

Dataset.fetch_results()

This function will attempt to grab any results from files or function objects that are attached to the dataset, storing them in the results property

Dataset.results

This property allows optimised access to the results of the previous run. When results is queried, it also checks to see if there are any errors, and warns you if any are found.

Dataset.errors

Similar to results, this stores a list of the error content if available.

[16]:
ds.fetch_results()
Fetching results
Transferring 7 Files... Done
[17]:
ds.results
Warning! Found 1 error(s), also check the `errors` property!
[17]:
[50,
 47.88,
 16,
 RunnerFailedError('TypeError: unsupported operand type(s) for *: 'int' and 'NoneType'')]
[18]:
ds.errors
[18]:
[None,
 None,
 None,
 "TypeError: unsupported operand type(s) for *: 'int' and 'NoneType'"]

Further features

While we touched on the runner availability earlier, we skipped over a feature which may be helpful for debugging purposes. The Runner object has a history property which prints a {time: state} dict that contains information about all state changes the runner has experienced.

This runner has been run and rerun a few times now, so the history will be quite full. On a fresh Dataset, a flag will be set to wipe this history.

[19]:
ds.runners[0].history
[19]:
{'2025-06-06 13:51:07/0': 'created',
 '2025-06-06 13:51:07/1': 'staged',
 '2025-06-06 13:51:07/2': 'transferred',
 '2025-06-06 13:51:07/3': 'submit pending',
 '2025-06-06 13:51:07/4': 'submitted',
 '2025-06-06 13:51:07/5': 'started',
 '2025-06-06 13:51:08/0': 'completed',
 '2025-06-06 13:51:15/0': 'reset',
 '2025-06-06 13:51:15/1': 'transferred',
 '2025-06-06 13:51:15/2': 'submit pending',
 '2025-06-06 13:51:23/0': 'reset',
 '2025-06-06 13:51:24/0': 'transferred',
 '2025-06-06 13:51:24/1': 'submit pending',
 '2025-06-06 13:51:24/2': 'submitted',
 '2025-06-06 13:51:24/3': 'started',
 '2025-06-06 13:51:25/0': 'completed',
 '2025-06-06 13:51:31/0': 'satisfied'}

here you can see the state history for the first runner in the list, showing the three runs, the creation time of the resultfile on the remote, and the final completion state where the results were loaded back into the runner

If you just require a list of states (for example, checking if a runner has passed through a state), there is the property Runner.status_list

Swapping out the serialiser

This is now covered in more depth in the dedicated tutorial.

Access to the commands used to execute the runs

Once you have run a dataset, you can access the command used to execute the bash scripts. This can be useful for debugging purposes.

[20]:
print('raw command:', ds.run_cmd.sent)
print('returned stdout:', ds.run_cmd.stdout)
print('returned stderr:', ds.run_cmd.stderr)
raw command: cd temp_ds_remote && bash tutorial_dataset-a2c088ba-master.sh
returned stdout:
returned stderr:

Running a single runner

While it was mentioned previously that the runners themselves should ideally not be touched, and all interaction should be done via the Dataset, it is possible to run a single runner if necessary.

Warning

this process is inefficient and should only be used if absolutely required. It may be preferable to clear the results of the offending runner using reset_runs() and rerunning with skip=True

[21]:
# store what the current last submission time is
last_submitted_initial = ds.runners[0].last_submitted
[22]:
ds.reset_runs(confirm=False)  # clear results to demonstrate

ds.runners[0].run(asynchronous=False)
time.sleep(1)
ds.fetch_results()
Staging Dataset... Staged 0/4 Runners
Transferring for 1/4 Runners
Transferring 5 Files... Done
Remotely executing 1/4 Runners
Fetching results
Transferring 2 Files... Done
[23]:
# get the new last submission time
last_submitted_after = ds.runners[0].last_submitted

This quick assertion makes sure that the runner that was resubmitted actually has a different submission time.

[24]:
assert last_submitted_initial != last_submitted_after

We can again here demonstrate the use of check_all_runner_states, as we have only run one, checking for full completion will return False. Obviously in this case, all_finished will do the job, but you can query here for any state, such as submitted.

[25]:
print(ds.check_all_runner_states('completed'))
False